3760bf9cc3f9ad948486e706d21e724656a34e15,enterprise/main/java/org/neo4j/kernel/ha/MasterClient.java,MasterClient,sendRequest,#RequestType#SlaveContext#Serializer#Deserializer#,127
Before Change
@SuppressWarnings( "unchecked" )
BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
channel.getPipeline().get( "blockingHandler" );
Pair<ChannelBuffer, Boolean> messageContext = readNextMessage( channelContext, reader );
ChannelBuffer message = messageContext.first();
T response = deserializer.read( message );
String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( message ) : null;
if ( messageContext.other() )
{
// This message consists of multiple chunks, apply transactions as early as possible
message = createDynamicBufferFrom( message );
boolean more = true;
while ( more )
{
Pair<ChannelBuffer, Boolean> followingMessage = readNextMessage( channelContext, reader );
more = followingMessage.other();
message.writeBytes( followingMessage.first() );
message = applyFullyAvailableTransactions( datasources, message );
}
}
// Here's the remaining transactions if the message consisted of multiple chunks,
// or all transactions if it only consisted of one chunk.
TransactionStream txStreams = type.includesSlaveContext() ?
readTransactionStreams( datasources, message ) : TransactionStream.EMPTY;
return new Response<T>( response, txStreams );
}
catch ( ClosedChannelException e )
After Change
// Read the response
@SuppressWarnings( "unchecked" )
BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
channel.getPipeline().get( "blockingHandler" );
final Triplet<Channel, ChannelBuffer, ByteBuffer> finalChannelContext = channelContext;
DechunkingChannelBuffer dechunkingBuffer = new DechunkingChannelBuffer( ChannelBuffers.dynamicBuffer(), reader )
{
@Override
protected ChannelBuffer readNext()
{
ChannelBuffer result = super.readNext();
if ( result == null )
{
channelPool.dispose( finalChannelContext );
throw new HaCommunicationException( "Channel has been closed" );
}
return result;
}
};
T response = deserializer.read( dechunkingBuffer );
String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( dechunkingBuffer ) : null;
while ( dechunkingBuffer.expectsMoreChunks() )
{
applyFullyAvailableTransactions( datasources, dechunkingBuffer );
if ( dechunkingBuffer.expectsMoreChunks() )
{
dechunkingBuffer.forceReadNextChunk();
}
}
// Here's the remaining transactions if the message consisted of multiple chunks,
// or all transactions if it only consisted of one chunk.
TransactionStream txStreams = type.includesSlaveContext() ?
readTransactionStreams( datasources, dechunkingBuffer ) : TransactionStream.EMPTY;
return new Response<T>( response, txStreams );
}
catch ( ClosedChannelException e )